{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(dask-overview)=\n",
    "# Dask distributed runtime\n",
    "\n",
    "<img src=\"../_static/images/dask_horizontal.svg\" width=\"300\"> <br>\n",
    "\n",
    "```{admonition} Quick Links\n",
    "* [Running Dask Over MLRun](./dask-mlrun.html)\n",
    "* [Pipelines Using Dask, Kubeflow and MLRun](./dask-pipeline.html)\n",
    "```\n",
    "\n",
    "## Dask overview\n",
    "Source: [Dask docs](https://docs.dask.org/en/latest/)<br>\n",
    "Dask is a flexible library for parallel computing in Python.\n",
    "\n",
    "Dask is composed of two parts:\n",
    "\n",
    "1. **Dynamic task scheduling** optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.\n",
    "2. **“Big Data” collections** like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers.\n",
    "\n",
    "**Dask emphasizes the following virtues:**\n",
    "\n",
    "* **Familiar**: Provides parallelized NumPy array and Pandas DataFrame objects<br>\n",
    "* **Flexible**: Provides a task scheduling interface for more custom workloads and integration with other projects.<br>\n",
    "* **Native**: Enables distributed computing in pure Python with access to the PyData stack.<br>\n",
    "* **Fast**: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms<br>\n",
    "* **Scales up**: Runs resiliently on clusters with 1000s of cores<br>\n",
    "* **Scales down**: Trivial to set up and run on a laptop in a single process<br>\n",
    "* **Responsive**: Designed with interactive computing in mind, it provides rapid feedback and diagnostics to aid humans<br>\n",
    "Dask collections and schedulers\n",
    "\n",
    "<img src=\"../_static/images/dask-overview.svg\" width=\"800\"><br>\n",
    "\n",
    "## Dask DataFrame mimics Pandas\n",
    "\n",
    "```python\n",
    "import pandas as pd                     import dask.dataframe as dd\n",
    "df = pd.read_csv('2015-01-01.csv')      df = dd.read_csv('2015-*-*.csv')\n",
    "df.groupby(df.user_id).value.mean()     df.groupby(df.user_id).value.mean().compute()\n",
    "\n",
    "```\n",
    "Dask Array mimics NumPy - documentation\n",
    "\n",
    "```python\n",
    "import numpy as np                       import dask.array as da\n",
    "f = h5py.File('myfile.hdf5')             f = h5py.File('myfile.hdf5')\n",
    "x = np.array(f['/small-data'])           x = da.from_array(f['/big-data'],\n",
    "                                                           chunks=(1000, 1000))\n",
    "x - x.mean(axis=1)                       x - x.mean(axis=1).compute()\n",
    "```\n",
    "\n",
    "Dask Bag mimics iterators, Toolz, and PySpark - documentation\n",
    "\n",
    "```python\n",
    "import dask.bag as db\n",
    "b = db.read_text('2015-*-*.json.gz').map(json.loads)\n",
    "b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()\n",
    "```\n",
    "\n",
    "Dask Delayed mimics for loops and wraps custom code - documentation\n",
    "\n",
    "```python\n",
    "from dask import delayed\n",
    "L = []\n",
    "for fn in filenames:                  # Use for loops to build up computation\n",
    "    data = delayed(load)(fn)          # Delay execution of function\n",
    "    L.append(delayed(process)(data))  # Build connections between variables\n",
    "\n",
    "result = delayed(summarize)(L)\n",
    "result.compute()\n",
    "```\n",
    "\n",
    "The concurrent.futures interface provides general submission of custom tasks: - documentation\n",
    "\n",
    "```python\n",
    "from dask.distributed import Client\n",
    "client = Client('scheduler:port')\n",
    "\n",
    "futures = []\n",
    "for fn in filenames:\n",
    "    future = client.submit(load, fn)\n",
    "    futures.append(future)\n",
    "\n",
    "summary = client.submit(summarize, futures)\n",
    "summary.result()\n",
    "```\n",
    "\n",
    "## Dask.distributed<br>\n",
    "`Dask.distributed` is a lightweight library for distributed computing in Python. It extends both the concurrent.futures and dask APIs to moderate sized clusters.<br>\n",
    "\n",
    "<img src=\"../_static/images/dask_dist.png\" width=\"800\"><br>\n",
    "\n",
    "### Motivation\n",
    "Distributed serves to complement the existing PyData analysis stack. In particular it meets the following needs:\n",
    "\n",
    "* **Low latency**: Each task suffers about 1ms of overhead. A small computation and network roundtrip can complete in less than 10ms.<br>\n",
    "* **Peer-to-peer data sharing**: Workers communicate with each other to share data. This removes central bottlenecks for data transfer.<br>\n",
    "* **Complex Scheduling**: Supports complex workflows (not just map/filter/reduce) which are necessary for sophisticated algorithms used in nd-arrays, machine learning, image processing, and statistics.<br>\n",
    "* **Pure Python**: Built in Python using well-known technologies. This eases installation, improves efficiency (for Python users), and simplifies debugging.<br>\n",
    "* **Data Locality**: Scheduling algorithms cleverly execute computations where data lives. This minimizes network traffic and improves efficiency.<br>\n",
    "* **Familiar APIs**: Compatible with the concurrent.futures API in the Python standard library. Compatible with dask API for parallel algorithms<br>\n",
    "* **Easy Setup**: As a Pure Python package distributed is pip installable and easy to set up on your own cluster.<br>\n",
    "\n",
    "### Architecture\n",
    "`Dask.distributed` is a centrally managed, distributed, dynamic task scheduler. The central `dask-scheduler` process coordinates the actions of several `dask-worker` processes spread across multiple machines and the concurrent requests of several clients.\n",
    "\n",
    "The scheduler is asynchronous and event driven, simultaneously responding to requests for computation from multiple clients and tracking the progress of multiple workers. The event-driven and asynchronous nature makes it flexible to concurrently handle a variety of workloads coming from multiple users at the same time while also handling a fluid worker population with failures and additions. Workers communicate amongst each other for bulk data transfer over TCP.\n",
    "\n",
    "Internally the scheduler tracks all work as a constantly changing directed acyclic graph of tasks. A task is a Python function operating on Python objects, which can be the results of other tasks. This graph of tasks grows as users submit more computations, fills out as workers complete tasks, and shrinks as users leave or become disinterested in previous results.\n",
    "\n",
    "Users interact by connecting a local Python session to the scheduler and submitting work, either by individual calls to the simple interface `client.submit(function, *args, **kwargs)` or by using the large data collections and parallel algorithms of the parent dask library. The collections in the dask library like `dask.array` and `dask.dataframe` provide easy access to sophisticated algorithms and familiar APIs like NumPy and Pandas, while the simple `client.submit` interface provides users with custom control when they want to break out of canned “big data” abstractions and submit fully custom workloads."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ~5X Faster with Dask\n",
    "\n",
    "Short example which demonstrates the power of Dask, in this notebook we will preform the following:\n",
    "* Generate random text files\n",
    "* Process the file by sorting and counting it's content\n",
    "* Compare run times"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generate random text files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import random\n",
    "import string\n",
    "import os\n",
    "\n",
    "from collections import Counter\n",
    "from dask.distributed import Client\n",
    "\n",
    "import warnings\n",
    "\n",
    "warnings.filterwarnings(\"ignore\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_big_random_letters(filename, size):\n",
    "    \"\"\"\n",
    "    generate big random letters/alphabets to a file\n",
    "    :param filename: the filename\n",
    "    :param size: the size in bytes\n",
    "    :return: void\n",
    "    \"\"\"\n",
    "    chars = \"\".join([random.choice(string.ascii_letters) for i in range(size)])  # 1\n",
    "\n",
    "    with open(filename, \"w\") as f:\n",
    "        f.write(chars)\n",
    "    pass"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "PATH = \"/User/howto/dask/random_files\"\n",
    "SIZE = 10000000\n",
    "\n",
    "for i in range(100):\n",
    "    generate_big_random_letters(filename=PATH + \"/file_\" + str(i) + \".txt\", size=SIZE)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Setfunction for benchmark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def count_letters(path):\n",
    "    \"\"\"\n",
    "    count letters in text file\n",
    "    :param path:  path to file\n",
    "    \"\"\"\n",
    "    # open file in read mode\n",
    "    file = open(path, \"r\")\n",
    "\n",
    "    # read the content of file\n",
    "    data = file.read()\n",
    "\n",
    "    # sort file\n",
    "    sorted_file = sorted(data)\n",
    "\n",
    "    # count file\n",
    "    number_of_characters = len(sorted_file)\n",
    "\n",
    "    return number_of_characters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "def process_files(path):\n",
    "    \"\"\"\n",
    "    list file and count letters\n",
    "    :param path: path to folder with files\n",
    "    \"\"\"\n",
    "    num_list = []\n",
    "    files = os.listdir(path)\n",
    "\n",
    "    for file in files:\n",
    "        cnt = count_letters(os.path.join(path, file))\n",
    "        num_list.append(cnt)\n",
    "\n",
    "    l = num_list\n",
    "    return print(\"done!\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Sort & count number of letters with Python"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "done!\n",
      "CPU times: user 2min 19s, sys: 9.31 s, total: 2min 29s\n",
      "Wall time: 2min 32s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "PATH = \"/User/howto/dask/random_files/\"\n",
    "process_files(PATH)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Sort & count number of letters with Dask"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "# get the dask client address\n",
    "client = Client()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "# list all files in folder\n",
    "files = [PATH + x for x in os.listdir(PATH)]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 13.2 ms, sys: 983 µs, total: 14.2 ms\n",
      "Wall time: 12.2 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "# run the count_letter function on a list of files while using multiple workers\n",
    "a = client.map(count_letters, files)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 3.39 s, sys: 533 ms, total: 3.92 s\n",
      "Wall time: 40 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "# gather results\n",
    "l = client.gather(a)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Additional topics\n",
    "\n",
    "```{toctree}\n",
    ":maxdepth: 1\n",
    "dask-mlrun\n",
    "dask-pipeline\n",
    "```\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
